{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "seven-remainder",
   "metadata": {},
   "source": [
    "# I-SAT demo using ICE-Sat2 orbit dataset and Sentinel 2 metadata\n",
    "# I-SAT have independent spatial join and temporal filtering \n",
    "## This demo could be used to find Spatio-temporal intersection between the two datasets and the potential area of interest at hour level"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "impossible-tribe",
   "metadata": {},
   "source": [
    "## please input date range for query, we have Sentinel dataset for 2020 in this demo"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "ready-facial",
   "metadata": {},
   "outputs": [],
   "source": [
    "### test first spatial join (consider hour delay) then temporal join "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "angry-environment",
   "metadata": {},
   "outputs": [],
   "source": [
    "# date in the format of \"yyy-MM-dd\", results include both start date and end date\n",
    "\n",
    "# The hour delay variable represents how many hours apart the two datasets are considered to be temporally intersected\n",
    "# E.g. if hour_delay = 6, for 2020-06-26 12:00:00, timestamps between 2020-06-26 06:00:00 and 2020-06-26 18:00:00 are considered to be temporally intersected\n",
    "start_date = '2020-01-01'\n",
    "end_date = '2020-03-31'\n",
    "duration_month = 3 # for export\n",
    "\n",
    "hour_delay = 6"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "certain-survivor",
   "metadata": {},
   "source": [
    "## please input the potential area of interest, we have \"Beaufort_Sea\" and \"Wandel_Sea\" in in this demo\n",
    "## if not using AOIs, could leave blank"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "romantic-white",
   "metadata": {},
   "outputs": [],
   "source": [
    "Paoi = 'Wandel_Sea'"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "organizational-default",
   "metadata": {},
   "source": [
    "## First, setup the Spark and Sedona environment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "still-album",
   "metadata": {},
   "outputs": [],
   "source": [
    "import findspark\n",
    "#findspark.init() "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "desperate-suggestion",
   "metadata": {},
   "outputs": [],
   "source": [
    "SPARK_HOME='/opt/cloudera/parcels/CDH/lib/spark'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "liquid-print",
   "metadata": {},
   "outputs": [],
   "source": [
    "findspark.init(SPARK_HOME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "comparable-easter",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/zhangp/.conda/envs/py37_environment/lib/python3.7/site-packages/geopandas/_compat.py:58: UserWarning: The installed version of PyGEOS is too old (0.6 installed, 0.8 required), and thus GeoPandas will not use PyGEOS.\n",
      "  UserWarning,\n"
     ]
    }
   ],
   "source": [
    "import json\n",
    "import os\n",
    "import codecs\n",
    "import subprocess\n",
    "#from hdfs import InsecureClient\n",
    "import numpy as np\n",
    "#from pyspark import SparkContext\n",
    "from pyspark import SQLContext\n",
    "from pyspark.sql import Row\n",
    "from pyspark.sql import functions as F\n",
    "from pyspark.sql.types import *\n",
    "import rtree\n",
    "from pyspark.sql import Window\n",
    "#import igraph\n",
    "#from igraph import Graph\n",
    "import geofeather"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "preceding-success",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "from pyspark import StorageLevel\n",
    "import geopandas as gpd\n",
    "import pandas as pd\n",
    "from pyspark.sql.types import StructType\n",
    "from pyspark.sql.types import StructField\n",
    "from pyspark.sql.types import StringType\n",
    "from pyspark.sql.types import LongType\n",
    "from shapely.geometry import Point\n",
    "from shapely.geometry import Polygon\n",
    "\n",
    "from sedona.register import SedonaRegistrator\n",
    "from sedona.core.SpatialRDD import SpatialRDD\n",
    "from sedona.core.SpatialRDD import PointRDD\n",
    "from sedona.core.SpatialRDD import PolygonRDD\n",
    "from sedona.core.SpatialRDD import LineStringRDD\n",
    "from sedona.core.enums import FileDataSplitter\n",
    "from sedona.utils.adapter import Adapter\n",
    "from sedona.core.spatialOperator import KNNQuery\n",
    "from sedona.core.spatialOperator import JoinQuery\n",
    "from sedona.core.spatialOperator import JoinQueryRaw\n",
    "from sedona.core.spatialOperator import RangeQuery\n",
    "from sedona.core.spatialOperator import RangeQueryRaw\n",
    "from sedona.core.formatMapper.shapefileParser import ShapefileReader\n",
    "from sedona.core.formatMapper import WkbReader\n",
    "from sedona.core.formatMapper import WktReader\n",
    "from sedona.core.formatMapper import GeoJsonReader\n",
    "from sedona.sql.types import GeometryType\n",
    "from sedona.core.enums import GridType\n",
    "from sedona.core.SpatialRDD import RectangleRDD\n",
    "from sedona.core.enums import IndexType\n",
    "from sedona.core.geom.envelope import Envelope\n",
    "from sedona.utils import SedonaKryoRegistrator, KryoSerializer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "environmental-discharge",
   "metadata": {},
   "outputs": [],
   "source": [
    "os.environ['PYSPARK_PYTHON'] = \"./environment/bin/python\"\n",
    "os.environ['YARN_CONF_DIR'] = \"/opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "dietary-antibody",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession \\\n",
    ".builder \\\n",
    ".appName(\"test_test_1\") \\\n",
    ".master('yarn') \\\n",
    ".config(\"spark.serializer\", KryoSerializer.getName) \\\n",
    ".config(\"spark.kryo.registrator\", SedonaKryoRegistrator.getName) \\\n",
    ".config('spark.jars','sedona-core-2.4_2.11-1.0.0-incubating.jar,sedona-sql-2.4_2.11-1.0.0-incubating.jar,sedona-python-adapter-2.4_2.11-1.0.0-incubating.jar,sedona-viz-2.4_2.11-1.0.0-incubating.jar,geotools-wrapper-geotools-24.0.jar') \\\n",
    ".config('spark.executor.memory', '20g') \\\n",
    ".config('spark.driver.memory', '10g') \\\n",
    ".config('spark.sql.shuffle.partitions', 6144) \\\n",
    ".config('spark.executor.instances', '24') \\\n",
    ".config('spark.executor.cores', '5') \\\n",
    ".config('spark.rpc.message.maxSize', '1024') \\\n",
    ".config('spark.yarn.dist.archives', 'environment.tar.gz#environment') \\\n",
    ".getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "intellectual-coating",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "SedonaRegistrator.registerAll(spark)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "expected-planner",
   "metadata": {},
   "source": [
    "## Second, read datasets"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "spatial-export",
   "metadata": {},
   "source": [
    "### read ICE_SAT2 orbits at hour level, 55935 orbits in total\n",
    "### Read ICESat-2 data, Spark have different types of input methods, here we read from Hadoop file system, as our Spark 2.4 is built based on Hadoop Yarn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "following-rocket",
   "metadata": {},
   "outputs": [],
   "source": [
    "is2_df_raw = spark.read.option(\"header\",True).option('inferSchema', True).csv(\"ICE_Sat_hour_cycle_orbits_split\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "spiritual-medicare",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "79090"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "is2_df_raw.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "superb-diagram",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<function __main__.orbit_date(s)>"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# extract collecting time of the orbit from filename, only at level of day for temporal granularity for now\n",
    "from datetime import datetime\n",
    "def orbit_date(s):\n",
    "    length = len(s)\n",
    "    date_str = s[(length-19):(length-8)]\n",
    "    date_obj = datetime.strptime(date_str, '%d-%b-%Y')\n",
    "#   return date_obj.strftime(\"%Y-%m-%d\")\n",
    "    return date_obj.strftime(\"%Y-%m-%d\")\n",
    "\n",
    "# Add date UDF\n",
    "spark.udf.register(\"orbit_date\", orbit_date)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "designed-freeware",
   "metadata": {},
   "outputs": [],
   "source": [
    "# is2_df_raw.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "incorporated-extra",
   "metadata": {},
   "outputs": [],
   "source": [
    "is2_df_raw = is2_df_raw.withColumn('date_date_type', (F.unix_timestamp(\"date_day\") + F.col(\"day_hour\") * 3600).cast('timestamp'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "monetary-forty",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------+--------------+-------+---------+-------------------+--------+--------------------+--------------------+-------------------+\n",
      "|           date_day|      split_id|day_key|day_cycle|     day_exact_time|day_hour|         Description|     formed_line_WKT|     date_date_type|\n",
      "+-------------------+--------------+-------+---------+-------------------+--------+--------------------+--------------------+-------------------+\n",
      "|2019-12-23 00:00:00|            -1|   1330|        5|2019-12-23 00:00:08|       0|RGT 1330 23-Dec-2...|LINESTRING (106.7...|2019-12-23 00:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1330|        5|2019-12-23 01:00:08|       1|RGT 1330 23-Dec-2...|LINESTRING (-92.1...|2019-12-23 01:00:00|\n",
      "|2019-12-23 00:00:00|45603962748928|   1331|        5|2019-12-23 01:04:25|       1|RGT 1331 23-Dec-2...|LINESTRING (-93.8...|2019-12-23 01:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1331|        5|2019-12-23 01:28:25|       1|RGT 1331 23-Dec-2...|LINESTRING (127.8...|2019-12-23 01:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1331|        5|2019-12-23 02:00:25|       2|RGT 1331 23-Dec-2...|LINESTRING (70.78...|2019-12-23 02:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1332|        5|2019-12-23 02:38:43|       2|RGT 1332 23-Dec-2...|LINESTRING (-117....|2019-12-23 02:00:00|\n",
      "|2019-12-23 00:00:00|30537217474562|   1332|        5|2019-12-23 03:00:43|       3|RGT 1332 23-Dec-2...|LINESTRING (-141....|2019-12-23 03:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1332|        5|2019-12-23 03:02:43|       3|RGT 1332 23-Dec-2...|LINESTRING (103.6...|2019-12-23 03:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1332|        5|2019-12-23 04:00:43|       4|RGT 1332 23-Dec-2...|LINESTRING (-135....|2019-12-23 04:00:00|\n",
      "|2019-12-23 00:00:00|12360915877888|   1333|        5|2019-12-23 04:13:00|       4|RGT 1333 23-Dec-2...|LINESTRING (-141....|2019-12-23 04:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1333|        5|2019-12-23 04:36:00|       4|RGT 1333 23-Dec-2...|LINESTRING (168.0...|2019-12-23 04:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1333|        5|2019-12-23 05:00:00|       5|RGT 1333 23-Dec-2...|LINESTRING (27.29...|2019-12-23 05:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1334|        5|2019-12-23 05:47:17|       5|RGT 1334 23-Dec-2...|LINESTRING (-164....|2019-12-23 05:00:00|\n",
      "|2019-12-23 00:00:00|34024730918912|   1334|        5|2019-12-23 06:00:17|       6|RGT 1334 23-Dec-2...|LINESTRING (-170....|2019-12-23 06:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1334|        5|2019-12-23 06:08:17|       6|RGT 1334 23-Dec-2...|LINESTRING (178.4...|2019-12-23 06:00:00|\n",
      "|2019-12-23 00:00:00|17961553231872|   1334|        5|2019-12-23 07:00:17|       7|RGT 1334 23-Dec-2...|LINESTRING (-170....|2019-12-23 07:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1334|        5|2019-12-23 07:05:17|       7|RGT 1334 23-Dec-2...|LINESTRING (179.5...|2019-12-23 07:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1335|        5|2019-12-23 07:21:35|       7|RGT 1335 23-Dec-2...|LINESTRING (171.5...|2019-12-23 07:00:00|\n",
      "|2019-12-23 00:00:00|42245298323457|   1335|        5|2019-12-23 08:00:35|       8|RGT 1335 23-Dec-2...|LINESTRING (-16.8...|2019-12-23 08:00:00|\n",
      "|2019-12-23 00:00:00|            -1|   1335|        5|2019-12-23 08:33:35|       8|RGT 1335 23-Dec-2...|LINESTRING (175.0...|2019-12-23 08:00:00|\n",
      "+-------------------+--------------+-------+---------+-------------------+--------+--------------------+--------------------+-------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "is2_df_raw.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "joint-senator",
   "metadata": {},
   "outputs": [],
   "source": [
    "# filter date range\n",
    "is2_df_raw = is2_df_raw.filter(F.col('date_date_type') <= F.lit(end_date)).filter(F.col('date_date_type') >= F.lit(start_date))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "complete-symposium",
   "metadata": {},
   "outputs": [],
   "source": [
    "# print(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "spectacular-jonathan",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "4929"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "is2_df_raw.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "broadband-provincial",
   "metadata": {},
   "outputs": [],
   "source": [
    "is2_df_raw.createOrReplaceTempView(\"is2_df_raw\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "dietary-discussion",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Extract Spatial information from the WKT column"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "saved-particle",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "|               orbit|day_key|day_cycle|     date_date_type|     day_exact_time|day_hour|         Description|is_timestamp|\n",
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "|LINESTRING (-124....|    935|        6|2020-02-26 00:00:00|2020-02-26 00:00:15|       0|RGT 935 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (-124....|    936|        6|2020-02-26 00:00:00|2020-02-26 00:00:32|       0|RGT 936 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (97.63...|    936|        6|2020-02-26 00:00:00|2020-02-26 00:24:32|       0|RGT 936 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (38.44...|    936|        6|2020-02-26 01:00:00|2020-02-26 01:00:32|       1|RGT 936 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (-147....|    937|        6|2020-02-26 01:00:00|2020-02-26 01:34:50|       1|RGT 937 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (161.0...|    937|        6|2020-02-26 01:00:00|2020-02-26 01:57:50|       1|RGT 937 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (37.46...|    937|        6|2020-02-26 02:00:00|2020-02-26 02:00:50|       2|RGT 937 26-Feb-20...|  1582700400|\n",
      "|LINESTRING (-168....|    937|        6|2020-02-26 03:00:00|2020-02-26 03:00:50|       3|RGT 937 26-Feb-20...|  1582704000|\n",
      "|LINESTRING (-171....|    938|        6|2020-02-26 03:00:00|2020-02-26 03:09:07|       3|RGT 938 26-Feb-20...|  1582704000|\n",
      "|LINESTRING (178.8...|    938|        6|2020-02-26 03:00:00|2020-02-26 03:27:07|       3|RGT 938 26-Feb-20...|  1582704000|\n",
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# generate Linestring using ST_GeomFromWKT() only keep certain columns\n",
    "\n",
    "is2_df = spark.sql(\"select ST_GeomFromWKT(formed_line_WKT) as orbit, day_key, day_cycle, date_date_type, day_exact_time, day_hour, Description from is2_df_raw\")\n",
    "\n",
    "is2_df = is2_df.withColumn('is_timestamp', F.unix_timestamp(F.col('date_date_type')))\n",
    "is2_df.createOrReplaceTempView(\"is2_df\")\n",
    "\n",
    "## Show the schema of the table\n",
    "spark.table(\"is2_df\").show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "ordinary-france",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Extract data within potential time slots\n",
    "import time\n",
    "import datetime\n",
    "\n",
    "start_timestamp = time.mktime(datetime.datetime.strptime(start_date, \"%Y-%m-%d\").timetuple())\n",
    "start_timestamp = start_timestamp - hour_delay * 3600 # for potential time delay\n",
    "\n",
    "end_timestamp = time.mktime(datetime.datetime.strptime(end_date, \"%Y-%m-%d\").timetuple())\n",
    "end_timestamp = end_timestamp + hour_delay * 3600 # for potential time delay"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "infinite-offset",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "|               orbit|day_key|day_cycle|     date_date_type|     day_exact_time|day_hour|         Description|is_timestamp|\n",
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "|LINESTRING (-124....|    935|        6|2020-02-26 00:00:00|2020-02-26 00:00:15|       0|RGT 935 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (-124....|    936|        6|2020-02-26 00:00:00|2020-02-26 00:00:32|       0|RGT 936 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (97.63...|    936|        6|2020-02-26 00:00:00|2020-02-26 00:24:32|       0|RGT 936 26-Feb-20...|  1582693200|\n",
      "|LINESTRING (38.44...|    936|        6|2020-02-26 01:00:00|2020-02-26 01:00:32|       1|RGT 936 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (-147....|    937|        6|2020-02-26 01:00:00|2020-02-26 01:34:50|       1|RGT 937 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (161.0...|    937|        6|2020-02-26 01:00:00|2020-02-26 01:57:50|       1|RGT 937 26-Feb-20...|  1582696800|\n",
      "|LINESTRING (37.46...|    937|        6|2020-02-26 02:00:00|2020-02-26 02:00:50|       2|RGT 937 26-Feb-20...|  1582700400|\n",
      "|LINESTRING (-168....|    937|        6|2020-02-26 03:00:00|2020-02-26 03:00:50|       3|RGT 937 26-Feb-20...|  1582704000|\n",
      "|LINESTRING (-171....|    938|        6|2020-02-26 03:00:00|2020-02-26 03:09:07|       3|RGT 938 26-Feb-20...|  1582704000|\n",
      "|LINESTRING (178.8...|    938|        6|2020-02-26 03:00:00|2020-02-26 03:27:07|       3|RGT 938 26-Feb-20...|  1582704000|\n",
      "+--------------------+-------+---------+-------------------+-------------------+--------+--------------------+------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "is2_df = is2_df.filter(F.col('is_timestamp') <= end_timestamp).filter(F.col('is_timestamp') >= start_timestamp)\n",
    "is2_df.createOrReplaceTempView(\"is2_df\")\n",
    "\n",
    "## Show the schema of the table\n",
    "spark.table(\"is2_df\").show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "overhead-shame",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "id": "interracial-tourist",
   "metadata": {},
   "source": [
    "### read Sentinel data for spatial join\n",
    "the file contains Sentinel-2 metadata that collect for one year in 2020, contains 2052387 records in total\n",
    "the format of meta_s2_2020_wkt.csv is in CSV with | sep, and footprint is in WTS format\n",
    "the dataset is also read from Hadoop file system\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "valued-throw",
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "sentinel_df = spark.read.option(\"header\",True).options(delimiter='|').csv(\"meta_s2_2020_wkt.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "id": "powerful-option",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- _c0: string (nullable = true)\n",
      " |-- s2_index: string (nullable = true)\n",
      " |-- timestamp: string (nullable = true)\n",
      " |-- wkt_geo: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "prerequisite-northern",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "id": "stock-currency",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+--------------------+\n",
      "|_c0|            s2_index|           timestamp|             wkt_geo|\n",
      "+---+--------------------+--------------------+--------------------+\n",
      "|  0|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((166.768...|\n",
      "|  1|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((167.185...|\n",
      "|  2|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((167.144...|\n",
      "|  3|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.663...|\n",
      "|  4|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((167.573...|\n",
      "|  5|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((168.518...|\n",
      "|  6|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.588...|\n",
      "|  7|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.555...|\n",
      "|  8|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.641...|\n",
      "|  9|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.471...|\n",
      "| 10|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((169.437...|\n",
      "| 11|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((170.575...|\n",
      "| 12|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.494...|\n",
      "| 13|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((171.457...|\n",
      "| 14|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((171.989...|\n",
      "| 15|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((170.999...|\n",
      "| 16|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((169.559...|\n",
      "| 17|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((169.524...|\n",
      "| 18|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((168.494...|\n",
      "| 19|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((169.425...|\n",
      "+---+--------------------+--------------------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.show(20)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "id": "rapid-synthesis",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_df = sentinel_df.withColumn('date_date_type', F.to_timestamp(F.udf(lambda x: x[:10] + ' ' + x[11:19])(F.col('timestamp')), 'yyyy-MM-dd HH:mm:ss'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "id": "freelance-musician",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+--------------------+-------------------+\n",
      "|_c0|            s2_index|           timestamp|             wkt_geo|     date_date_type|\n",
      "+---+--------------------+--------------------+--------------------+-------------------+\n",
      "|  0|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((166.768...|2020-01-01 00:01:29|\n",
      "|  1|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((167.185...|2020-01-01 00:01:20|\n",
      "|  2|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((167.144...|2020-01-01 00:01:05|\n",
      "|  3|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.663...|2020-01-01 00:01:27|\n",
      "|  4|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((167.573...|2020-01-01 00:00:51|\n",
      "|  5|20191231T235959_2...|2020-01-01T00:00:...|POLYGON ((168.518...|2020-01-01 00:00:38|\n",
      "|  6|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.588...|2020-01-01 00:01:17|\n",
      "|  7|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.555...|2020-01-01 00:01:02|\n",
      "|  8|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.641...|2020-01-01 00:01:27|\n",
      "|  9|20191231T235959_2...|2020-01-01T00:01:...|POLYGON ((169.471...|2020-01-01 00:01:12|\n",
      "+---+--------------------+--------------------+--------------------+-------------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "chemical-insert",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "id": "governing-apache",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- _c0: string (nullable = true)\n",
      " |-- s2_index: string (nullable = true)\n",
      " |-- timestamp: string (nullable = true)\n",
      " |-- wkt_geo: string (nullable = true)\n",
      " |-- date_date_type: timestamp (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "id": "affecting-helicopter",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "2052387"
      ]
     },
     "execution_count": 32,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sentinel_df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "id": "overall-forwarding",
   "metadata": {},
   "outputs": [],
   "source": [
    "# extract spatial information using WKT format footprint\n",
    "sentinel_df.createOrReplaceTempView(\"sentinel_df\")\n",
    "sentinel_sedona = spark.sql(\"select ST_GeomFromWKT(sentinel_df.wkt_geo) as geometry, sentinel_df.s2_index as s2_index, sentinel_df.date_date_type as date_date_type from sentinel_df\")\n",
    "sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "sentinel_sedona = sentinel_sedona.withColumnRenamed('date_date_type', 'Sentinel_date_date_type')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "id": "forty-speech",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-----------------------+\n",
      "|            geometry|            s2_index|Sentinel_date_date_type|\n",
      "+--------------------+--------------------+-----------------------+\n",
      "|POLYGON ((166.768...|20191231T235959_2...|    2020-01-01 00:01:29|\n",
      "|POLYGON ((167.185...|20191231T235959_2...|    2020-01-01 00:01:20|\n",
      "|POLYGON ((167.144...|20191231T235959_2...|    2020-01-01 00:01:05|\n",
      "|POLYGON ((169.663...|20191231T235959_2...|    2020-01-01 00:01:27|\n",
      "|POLYGON ((167.573...|20191231T235959_2...|    2020-01-01 00:00:51|\n",
      "|POLYGON ((168.518...|20191231T235959_2...|    2020-01-01 00:00:38|\n",
      "|POLYGON ((169.588...|20191231T235959_2...|    2020-01-01 00:01:17|\n",
      "|POLYGON ((169.555...|20191231T235959_2...|    2020-01-01 00:01:02|\n",
      "|POLYGON ((169.641...|20191231T235959_2...|    2020-01-01 00:01:27|\n",
      "|POLYGON ((169.471...|20191231T235959_2...|    2020-01-01 00:01:12|\n",
      "+--------------------+--------------------+-----------------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Extract data within potential time slots\n",
    "\n",
    "sentinel_sedona = sentinel_sedona.filter(F.unix_timestamp(\"Sentinel_date_date_type\") <= end_timestamp).filter(F.unix_timestamp(\"Sentinel_date_date_type\") >= start_timestamp)\n",
    "sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "\n",
    "sentinel_sedona.show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "welcome-presence",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "waiting-timothy",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "id": "individual-turner",
   "metadata": {},
   "source": [
    "## Third, spatio-temporal join Sentinel dataset and ICE-Sat 2 dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "id": "mighty-malawi",
   "metadata": {},
   "outputs": [],
   "source": [
    "# 3.1 join the two Sedona dataframe according the timestamps, hour level at now, consider time delay"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "id": "material-premiere",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona = sentinel_sedona.withColumn('sentinel_max_timestamp', (F.unix_timestamp(\"Sentinel_date_date_type\") + hour_delay * 3600)).\\\n",
    "withColumn('sentinel_min_timestamp', (F.unix_timestamp(\"Sentinel_date_date_type\") - hour_delay * 3600))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "id": "color-riverside",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-----------------------+----------------------+----------------------+\n",
      "|            geometry|            s2_index|Sentinel_date_date_type|sentinel_max_timestamp|sentinel_min_timestamp|\n",
      "+--------------------+--------------------+-----------------------+----------------------+----------------------+\n",
      "|POLYGON ((166.768...|20191231T235959_2...|    2020-01-01 00:01:29|            1577876489|            1577833289|\n",
      "|POLYGON ((167.185...|20191231T235959_2...|    2020-01-01 00:01:20|            1577876480|            1577833280|\n",
      "|POLYGON ((167.144...|20191231T235959_2...|    2020-01-01 00:01:05|            1577876465|            1577833265|\n",
      "|POLYGON ((169.663...|20191231T235959_2...|    2020-01-01 00:01:27|            1577876487|            1577833287|\n",
      "|POLYGON ((167.573...|20191231T235959_2...|    2020-01-01 00:00:51|            1577876451|            1577833251|\n",
      "|POLYGON ((168.518...|20191231T235959_2...|    2020-01-01 00:00:38|            1577876438|            1577833238|\n",
      "|POLYGON ((169.588...|20191231T235959_2...|    2020-01-01 00:01:17|            1577876477|            1577833277|\n",
      "|POLYGON ((169.555...|20191231T235959_2...|    2020-01-01 00:01:02|            1577876462|            1577833262|\n",
      "|POLYGON ((169.641...|20191231T235959_2...|    2020-01-01 00:01:27|            1577876487|            1577833287|\n",
      "|POLYGON ((169.471...|20191231T235959_2...|    2020-01-01 00:01:12|            1577876472|            1577833272|\n",
      "|POLYGON ((169.437...|20191231T235959_2...|    2020-01-01 00:00:58|            1577876458|            1577833258|\n",
      "|POLYGON ((170.575...|20191231T235959_2...|    2020-01-01 00:01:30|            1577876490|            1577833290|\n",
      "|POLYGON ((169.494...|20191231T235959_2...|    2020-01-01 00:01:25|            1577876485|            1577833285|\n",
      "|POLYGON ((171.457...|20191231T235959_2...|    2020-01-01 00:01:07|            1577876467|            1577833267|\n",
      "|POLYGON ((171.989...|20191231T235959_2...|    2020-01-01 00:00:53|            1577876453|            1577833253|\n",
      "|POLYGON ((170.999...|20191231T235959_2...|    2020-01-01 00:01:19|            1577876479|            1577833279|\n",
      "|POLYGON ((169.559...|20191231T235959_2...|    2020-01-01 00:00:48|            1577876448|            1577833248|\n",
      "|POLYGON ((169.524...|20191231T235959_2...|    2020-01-01 00:00:34|            1577876434|            1577833234|\n",
      "|POLYGON ((168.494...|20191231T235959_2...|    2020-01-01 00:00:19|            1577876419|            1577833219|\n",
      "|POLYGON ((169.425...|20191231T235959_2...|    2020-01-01 00:00:08|            1577876408|            1577833208|\n",
      "+--------------------+--------------------+-----------------------+----------------------+----------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_sedona.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "id": "preceding-correction",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- geometry: geometry (nullable = false)\n",
      " |-- s2_index: string (nullable = true)\n",
      " |-- Sentinel_date_date_type: timestamp (nullable = true)\n",
      " |-- sentinel_max_timestamp: long (nullable = true)\n",
      " |-- sentinel_min_timestamp: long (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_sedona.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "id": "heated-plastic",
   "metadata": {},
   "outputs": [],
   "source": [
    "# to gpd dataframe for baseline \n",
    "# sentinel_sedona_gpd = gpd.GeoDataFrame(sentinel_sedona.toPandas(), geometry='geometry')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "id": "distant-modeling",
   "metadata": {},
   "outputs": [],
   "source": [
    "# sentinel_sedona_gpd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "id": "removable-investor",
   "metadata": {},
   "outputs": [],
   "source": [
    "# sentinel_sedona_gpd['Sentinel_date_unix'] = sentinel_sedona_gpd['Sentinel_date_date_type'].astype(np.int)// 10**9\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "id": "quality-negotiation",
   "metadata": {},
   "outputs": [],
   "source": [
    "# sentinel_sedona_gpd['S2_unix'] = sentinel_sedona_gpd['Sentinel_date_unix']\n",
    "# sentinel_sedona_gpd['S2_time_max'] = sentinel_sedona_gpd['sentinel_max_timestamp']\n",
    "# sentinel_sedona_gpd['S2_time_min'] = sentinel_sedona_gpd['sentinel_min_timestamp']\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "id": "fresh-container",
   "metadata": {},
   "outputs": [],
   "source": [
    "# gpd.GeoDataFrame(sentinel_sedona_gpd[['s2_index', 'S2_unix', 'S2_time_max', 'S2_time_min', 'geometry']]).to_file('sentinel_6_month.shp')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "id": "beneficial-commodity",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- orbit: geometry (nullable = false)\n",
      " |-- day_key: integer (nullable = true)\n",
      " |-- day_cycle: integer (nullable = true)\n",
      " |-- date_date_type: timestamp (nullable = true)\n",
      " |-- day_exact_time: timestamp (nullable = true)\n",
      " |-- day_hour: integer (nullable = true)\n",
      " |-- Description: string (nullable = true)\n",
      " |-- is_timestamp: long (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "is2_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "id": "dietary-canberra",
   "metadata": {},
   "outputs": [],
   "source": [
    "# is2_df_pd = is2_df.toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "id": "flying-continuity",
   "metadata": {},
   "outputs": [],
   "source": [
    "# is2_df_pd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "id": "assisted-paris",
   "metadata": {},
   "outputs": [],
   "source": [
    "# gpd.GeoDataFrame(is2_df_pd[['orbit', 'Description', 'is_timestamp']], geometry='orbit').to_file('is2_6month.shp')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "meaningful-executive",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "id": "polar-street",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "test_spatial_join = spark.sql(\"SELECT * FROM sentinel_sedona, is2_df WHERE ST_Intersects(sentinel_sedona.geometry, is2_df.orbit)\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "id": "mounted-graduate",
   "metadata": {},
   "outputs": [],
   "source": [
    "# import time\n",
    "# start_spatial_time = time.time()\n",
    "# test_spatial_join.select('uuid', 'Description', 'is_timestamp').write.format('com.databricks.spark.csv')\\\n",
    "#         .mode(\"overwrite\")\\\n",
    "#         .option('header', True)\\\n",
    "#         .save('sentinel_is2_intersect_pure_spatial')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "id": "defined-strip",
   "metadata": {},
   "outputs": [],
   "source": [
    "# with open('speed_pure_spatial_join_month.txt', 'w') as f:\n",
    "#     f.write('pure spatial join time:')\n",
    "#     f.write(str(time.time() - start_spatial_time))\n",
    "#     f.write('/n')\n",
    "#     f.write('number of spatial intersections:')\n",
    "#     f.write(str(test_spatial_join.count()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "id": "matched-bible",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- geometry: geometry (nullable = false)\n",
      " |-- s2_index: string (nullable = true)\n",
      " |-- Sentinel_date_date_type: timestamp (nullable = true)\n",
      " |-- sentinel_max_timestamp: long (nullable = true)\n",
      " |-- sentinel_min_timestamp: long (nullable = true)\n",
      " |-- orbit: geometry (nullable = false)\n",
      " |-- day_key: integer (nullable = true)\n",
      " |-- day_cycle: integer (nullable = true)\n",
      " |-- date_date_type: timestamp (nullable = true)\n",
      " |-- day_exact_time: timestamp (nullable = true)\n",
      " |-- day_hour: integer (nullable = true)\n",
      " |-- Description: string (nullable = true)\n",
      " |-- is_timestamp: long (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "test_spatial_join.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "id": "linear-royal",
   "metadata": {},
   "outputs": [],
   "source": [
    "test_spatial_join = test_spatial_join.cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "id": "serious-prime",
   "metadata": {},
   "outputs": [],
   "source": [
    "test_ST_join = test_spatial_join.filter(F.col('sentinel_max_timestamp') >= F.col('is_timestamp')).filter(F.col('sentinel_min_timestamp') <= F.col('is_timestamp'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 54,
   "id": "august-scanning",
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "start_temporal_time = time.time()\n",
    "test_ST_join.select('s2_index', 'Description', 'is_timestamp').write.format('com.databricks.spark.csv')\\\n",
    "        .mode(\"overwrite\")\\\n",
    "        .option('header', True)\\\n",
    "        .save('sentinel_is2_intersect_spatial_temporal')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 55,
   "id": "enormous-founder",
   "metadata": {},
   "outputs": [],
   "source": [
    "with open('ISAT_join_{}_hours_delay_S2I2.txt'.format(str(hour_delay)), 'w') as f:\n",
    "    f.write('ISAT time in total:')\n",
    "    f.write(str(time.time() - start_temporal_time))\n",
    "    f.write('\\n')\n",
    "    f.write('number of spatial intersections:')\n",
    "    f.write(str(test_ST_join.count()))\n",
    "    f.write('\\n')\n",
    "    f.write('from and to date')\n",
    "    f.write(start_date)\n",
    "    f.write('\\n')\n",
    "    f.write(end_date)\n",
    "    f.write('\\n')\n",
    "    f.write('ICEsat2 data count:')\n",
    "    f.write(str(is2_df_raw.count()))\n",
    "    f.write('\\n')\n",
    "\n",
    "    f.write('Sentinel 2 data count:')\n",
    "    f.write(str(sentinel_sedona.count()))\n",
    "    \n",
    "    \n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "broad-portrait",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "activated-panel",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "specified-partnership",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
